package com.bw.utils;

import com.bw.bean.ShopCustomerMetric;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;

public class CustomerMetricSink implements SinkFunction<ShopCustomerMetric> {

    @Override
    public void invoke(ShopCustomerMetric metric, Context context) throws Exception {
        // 这里可以将结果输出到控制台、Kafka、HBase或Doris等
        System.out.println("===== 店铺客户指标 =====");
        System.out.println("店铺ID: " + metric.getShopId());
        System.out.println("统计时间: " + metric.getStatTime());
        System.out.println("总客户数: " + metric.getTotalCustomer());
        System.out.println("新增客户数: " + metric.getNewCustomer());
        System.out.println("总消费金额: " + metric.getTotalConsumption());
        System.out.println("平均消费金额: " + metric.getAvgConsumption());
        System.out.println("客户留存率: " + metric.getCustomerRetentionRate());
        System.out.println("活跃客户数: " + metric.getActiveCustomer());
        System.out.println("=======================");
    }
}
